/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iceberg.inmemory;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.BaseViewOperations;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewUtil;

/**
 * Catalog implementation that uses in-memory data-structures to store the namespaces and tables.
 * This class doesn't touch external resources and can be utilized to write unit tests without side
 * effects. It uses {@link InMemoryFileIO}.
 */
public class InMemoryCatalog extends BaseMetastoreViewCatalog
    implements SupportsNamespaces, Closeable {
  private static final Joiner SLASH = Joiner.on("/");
  private static final Joiner DOT = Joiner.on(".");

  private final ConcurrentMap<Namespace, Map<String, String>> namespaces;
  private final ConcurrentMap<TableIdentifier, String> tables;
  private final ConcurrentMap<TableIdentifier, String> views;
  private FileIO io;
  private String catalogName;
  private String warehouseLocation;
  private CloseableGroup closeableGroup;

  public InMemoryCatalog() {
    this.namespaces = Maps.newConcurrentMap();
    this.tables = Maps.newConcurrentMap();
    this.views = Maps.newConcurrentMap();
  }

  @Override
  public String name() {
    return catalogName;
  }

  @Override
  public void initialize(String name, Map<String, String> properties) {
    this.catalogName = name != null ? name : InMemoryCatalog.class.getSimpleName();

    String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
    this.warehouseLocation = warehouse.replaceAll("/*$", "");
    this.io = new InMemoryFileIO();
    this.closeableGroup = new CloseableGroup();
    closeableGroup.addCloseable(metricsReporter());
    closeableGroup.setSuppressCloseFailure(true);
  }

  @Override
  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
    return new InMemoryTableOperations(io, tableIdentifier);
  }

  @Override
  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
    return SLASH.join(
        defaultNamespaceLocation(tableIdentifier.namespace()), tableIdentifier.name());
  }

  private String defaultNamespaceLocation(Namespace namespace) {
    if (namespace.isEmpty()) {
      return warehouseLocation;
    } else {
      return SLASH.join(warehouseLocation, SLASH.join(namespace.levels()));
    }
  }

  @Override
  public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) {
    TableOperations ops = newTableOps(tableIdentifier);
    TableMetadata lastMetadata;
    if (purge && ops.current() != null) {
      lastMetadata = ops.current();
    } else {
      lastMetadata = null;
    }

    synchronized (this) {
      if (null == tables.remove(tableIdentifier)) {
        return false;
      }
    }

    if (purge && lastMetadata != null) {
      CatalogUtil.dropTableData(ops.io(), lastMetadata);
    }

    return true;
  }

  @Override
  public List<TableIdentifier> listTables(Namespace namespace) {
    if (!namespaceExists(namespace) && !namespace.isEmpty()) {
      throw new NoSuchNamespaceException(
          "Cannot list tables for namespace. Namespace does not exist: %s", namespace);
    }

    return tables.keySet().stream()
        .filter(t -> namespace.isEmpty() || t.namespace().equals(namespace))
        .sorted(Comparator.comparing(TableIdentifier::toString))
        .collect(Collectors.toList());
  }

  @Override
  public void renameTable(TableIdentifier from, TableIdentifier to) {
    if (from.equals(to)) {
      return;
    }

    synchronized (this) {
      if (!namespaceExists(to.namespace())) {
        throw new NoSuchNamespaceException(
            "Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
      }

      String fromLocation = tables.get(from);
      if (null == fromLocation) {
        throw new NoSuchTableException("Cannot rename %s to %s. Table does not exist", from, to);
      }

      if (tables.containsKey(to)) {
        throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to);
      }

      if (views.containsKey(to)) {
        throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
      }

      tables.put(to, fromLocation);
      tables.remove(from);
    }
  }

  @Override
  public void createNamespace(Namespace namespace) {
    createNamespace(namespace, Collections.emptyMap());
  }

  @Override
  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
    synchronized (this) {
      if (namespaceExists(namespace)) {
        throw new AlreadyExistsException(
            "Cannot create namespace %s. Namespace already exists", namespace);
      }

      namespaces.put(namespace, ImmutableMap.copyOf(metadata));
    }
  }

  @Override
  public boolean namespaceExists(Namespace namespace) {
    return namespaces.containsKey(namespace);
  }

  @Override
  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
    synchronized (this) {
      if (!namespaceExists(namespace)) {
        return false;
      }

      List<TableIdentifier> tableIdentifiers = listTables(namespace);
      if (!tableIdentifiers.isEmpty()) {
        throw new NamespaceNotEmptyException(
            "Namespace %s is not empty. Contains %d table(s).", namespace, tableIdentifiers.size());
      }

      return namespaces.remove(namespace) != null;
    }
  }

  @Override
  public boolean setProperties(Namespace namespace, Map<String, String> properties)
      throws NoSuchNamespaceException {
    synchronized (this) {
      if (!namespaceExists(namespace)) {
        throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
      }

      namespaces.computeIfPresent(
          namespace,
          (k, v) ->
              ImmutableMap.<String, String>builder()
                  .putAll(v)
                  .putAll(properties)
                  .buildKeepingLast());

      return true;
    }
  }

  @Override
  public boolean removeProperties(Namespace namespace, Set<String> properties)
      throws NoSuchNamespaceException {
    synchronized (this) {
      if (!namespaceExists(namespace)) {
        throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
      }

      namespaces.computeIfPresent(
          namespace,
          (k, v) -> {
            Map<String, String> newProperties = Maps.newHashMap(v);
            properties.forEach(newProperties::remove);
            return ImmutableMap.copyOf(newProperties);
          });

      return true;
    }
  }

  @Override
  public Map<String, String> loadNamespaceMetadata(Namespace namespace)
      throws NoSuchNamespaceException {
    Map<String, String> properties = namespaces.get(namespace);
    if (properties == null) {
      throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
    }

    return ImmutableMap.copyOf(properties);
  }

  @Override
  public List<Namespace> listNamespaces() {
    return namespaces.keySet().stream()
        .filter(n -> !n.isEmpty())
        .map(n -> n.level(0))
        .distinct()
        .sorted()
        .map(Namespace::of)
        .collect(Collectors.toList());
  }

  @Override
  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
    final String searchNamespaceString =
        namespace.isEmpty() ? "" : DOT.join(namespace.levels()) + ".";
    final int searchNumberOfLevels = namespace.levels().length;

    List<Namespace> filteredNamespaces =
        namespaces.keySet().stream()
            .filter(n -> DOT.join(n.levels()).startsWith(searchNamespaceString))
            .collect(Collectors.toList());

    // If the namespace does not exist and the namespace is not a prefix of another namespace,
    // throw an exception.
    if (!namespaces.containsKey(namespace) && filteredNamespaces.isEmpty()) {
      throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
    }

    return filteredNamespaces.stream()
        // List only the child-namespaces roots.
        .map(n -> Namespace.of(Arrays.copyOf(n.levels(), searchNumberOfLevels + 1)))
        .distinct()
        .sorted(Comparator.comparing(n -> DOT.join(n.levels())))
        .collect(Collectors.toList());
  }

  @Override
  public void close() throws IOException {
    closeableGroup.close();
    namespaces.clear();
    tables.clear();
    views.clear();
  }

  @Override
  public List<TableIdentifier> listViews(Namespace namespace) {
    if (!namespaceExists(namespace) && !namespace.isEmpty()) {
      throw new NoSuchNamespaceException(
          "Cannot list views for namespace. Namespace does not exist: %s", namespace);
    }

    return views.keySet().stream()
        .filter(v -> namespace.isEmpty() || v.namespace().equals(namespace))
        .sorted(Comparator.comparing(TableIdentifier::toString))
        .collect(Collectors.toList());
  }

  @Override
  protected InMemoryViewOperations newViewOps(TableIdentifier identifier) {
    return new InMemoryViewOperations(io, identifier);
  }

  @Override
  public boolean dropView(TableIdentifier identifier) {
    synchronized (this) {
      return null != views.remove(identifier);
    }
  }

  @Override
  public void renameView(TableIdentifier from, TableIdentifier to) {
    if (from.equals(to)) {
      return;
    }

    synchronized (this) {
      if (!namespaceExists(to.namespace())) {
        throw new NoSuchNamespaceException(
            "Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
      }

      String fromViewLocation = views.get(from);
      if (null == fromViewLocation) {
        throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to);
      }

      if (tables.containsKey(to)) {
        throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to);
      }

      if (views.containsKey(to)) {
        throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
      }

      views.put(to, fromViewLocation);
      views.remove(from);
    }
  }

  private class InMemoryTableOperations extends BaseMetastoreTableOperations {
    private final FileIO fileIO;
    private final TableIdentifier tableIdentifier;
    private final String fullTableName;

    InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) {
      this.fileIO = fileIO;
      this.tableIdentifier = tableIdentifier;
      this.fullTableName = fullTableName(catalogName, tableIdentifier);
    }

    @Override
    public void doRefresh() {
      String latestLocation = tables.get(tableIdentifier);
      if (latestLocation == null) {
        disableRefresh();
      } else {
        refreshFromMetadataLocation(latestLocation);
      }
    }

    @Override
    public void doCommit(TableMetadata base, TableMetadata metadata) {
      String newLocation = writeNewMetadataIfRequired(base == null, metadata);
      String oldLocation = base == null ? null : base.metadataFileLocation();

      synchronized (InMemoryCatalog.this) {
        if (null == base && !namespaceExists(tableIdentifier.namespace())) {
          throw new NoSuchNamespaceException(
              "Cannot create table %s. Namespace does not exist: %s",
              tableIdentifier, tableIdentifier.namespace());
        }

        if (views.containsKey(tableIdentifier)) {
          throw new AlreadyExistsException(
              "View with same name already exists: %s", tableIdentifier);
        }

        tables.compute(
            tableIdentifier,
            (k, existingLocation) -> {
              if (!Objects.equal(existingLocation, oldLocation)) {
                if (null == base) {
                  throw new AlreadyExistsException("Table already exists: %s", tableName());
                }

                if (null == existingLocation) {
                  throw new NoSuchTableException("Table does not exist: %s", tableName());
                }

                throw new CommitFailedException(
                    "Cannot commit to table %s metadata location from %s to %s "
                        + "because it has been concurrently modified to %s",
                    tableIdentifier, oldLocation, newLocation, existingLocation);
              }
              return newLocation;
            });
      }
    }

    @Override
    public FileIO io() {
      return fileIO;
    }

    @Override
    protected String tableName() {
      return fullTableName;
    }
  }

  private class InMemoryViewOperations extends BaseViewOperations {
    private final FileIO io;
    private final TableIdentifier identifier;
    private final String fullViewName;

    InMemoryViewOperations(FileIO io, TableIdentifier identifier) {
      this.io = io;
      this.identifier = identifier;
      this.fullViewName = ViewUtil.fullViewName(catalogName, identifier);
    }

    @Override
    public void doRefresh() {
      String latestLocation = views.get(identifier);
      if (latestLocation == null) {
        disableRefresh();
      } else {
        refreshFromMetadataLocation(latestLocation);
      }
    }

    @Override
    public void doCommit(ViewMetadata base, ViewMetadata metadata) {
      String newLocation = writeNewMetadataIfRequired(metadata);
      String oldLocation = base == null ? null : currentMetadataLocation();

      synchronized (InMemoryCatalog.this) {
        if (null == base && !namespaceExists(identifier.namespace())) {
          throw new NoSuchNamespaceException(
              "Cannot create view %s. Namespace does not exist: %s",
              identifier, identifier.namespace());
        }

        if (tables.containsKey(identifier)) {
          throw new AlreadyExistsException("Table with same name already exists: %s", identifier);
        }

        views.compute(
            identifier,
            (k, existingLocation) -> {
              if (!Objects.equal(existingLocation, oldLocation)) {
                if (null == base) {
                  throw new AlreadyExistsException("View already exists: %s", identifier);
                }

                if (null == existingLocation) {
                  throw new NoSuchViewException("View does not exist: %s", identifier);
                }

                throw new CommitFailedException(
                    "Cannot commit to view %s metadata location from %s to %s "
                        + "because it has been concurrently modified to %s",
                    identifier, oldLocation, newLocation, existingLocation);
              }

              return newLocation;
            });
      }
    }

    @Override
    public FileIO io() {
      return io;
    }

    @Override
    protected String viewName() {
      return fullViewName;
    }
  }
}
